package com.zk.elect;

import com.google.common.base.Preconditions;
import com.zk.ZkClientHelper;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatch.CloseMode;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * 利用ZK Leader Elect的功能, 对指定path选举出leader,并返回选举结果
 * 
 * @author 007
 * 
 */
public class ZkServiceLocator {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private static String FUNC_MID_PATH = "servicelocate";
    private static String FUNC_FULL_PATH = ZKPaths.makePath(ZkClientHelper.BASE_PATH, FUNC_MID_PATH);
    private static final Map<String, LeaderLatch> latchMaps = new ConcurrentHashMap();
    private static final Map<String, Thread> threadMaps = new ConcurrentHashMap();

    private static String HOSTIP_STR = getLocalHostIP();//HOSTIP_ANY = "any/0.0.0.0";

    public static void main(String[] args) {
        System.out.println("ssssssssssss");
    }
    /**
     */
    private ZkServiceLocator() {

    }

    /**
     */
    private static class ServiceLocatorHolder {
        private static ZkServiceLocator instance = new ZkServiceLocator();
    }

    /**
     */
    public static ZkServiceLocator getInstance() {
        return ServiceLocatorHolder.instance;
    }



    /**
     * 
	 * @throws Exception
     */
    public void startElect(String serviceName, ZkServiceElectListener electListener) throws Exception {
        Preconditions.checkNotNull(serviceName, "serviceName should not be null");
        Preconditions.checkNotNull(electListener, "ZkServiceElectListener should not be null");
        Preconditions.checkArgument(serviceName.indexOf("/") <= 0, "serviceName should not contains char '/'");

        log.debug("StartElect:[{}]", serviceName);

        LeaderLatch leaderLatch = new LeaderLatch(ZkClientHelper.getZkClient(), ZKPaths.makePath(FUNC_FULL_PATH, serviceName), "id",
                CloseMode.NOTIFY_LEADER);

        leaderLatch.addListener(electListener);
        leaderLatch.start();
        latchMaps.put(serviceName, leaderLatch);

        final String threadName = serviceName;
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                Thread.currentThread().setName(threadName);
                while (true) {
                    try {
                        // 如果leader退出, 或自己为leader, await退出
                        LeaderLatch leaderLatch = latchMaps.get(threadName);
                        if (leaderLatch == null) {
                            log.debug("{} leaderLatch is null,thread return", threadName);
                            return;
                        }
                        log.debug(">>>>>> Thread[{}] await in.", threadName);
                        leaderLatch.await();
                        boolean isLeader = leaderLatch.hasLeadership();
                        log.debug(threadName + " isLeader=" + isLeader);
                        if (isLeader) {
                            try {
                            	TimeUnit.SECONDS.sleep(10L);
                            } catch (InterruptedException e1) {
                                //log.error("Client[{}], thread:[{}] was interrupted.", HOSTIP_STR, threadName, e1);
                                Thread.currentThread().interrupt();
                            }
                        }
                        log.debug("<<<<<< Thread[{}] await out.", threadName);
                    } catch (Throwable e) {
                        log.error("Client[{}], thread:[{}] get exception.", HOSTIP_STR, threadName, e);
                    }
                }// end of while
            }
        });

        thread.start();

        threadMaps.put(serviceName, thread);

    }

    /**
     * 退出参选
     */
    public void stopElect(String serviceName) {
        Preconditions.checkNotNull(serviceName, "serviceName should not be null");
        log.warn("Stop Elect Service:{}", serviceName);
        try {
            Thread thread = threadMaps.remove(serviceName);
            if (thread != null) {
                thread.interrupt();
                thread = null;
            }

            LeaderLatch latch = latchMaps.remove(serviceName);
            if (latch != null) {
                CloseableUtils.closeQuietly(latch);
                latch = null;
            }
        } catch (Throwable e) {
            log.error("Stop Elect Service:{},ERROR:",serviceName, e);
        }
    }

    /**
     * 退出参选
     */
    public void stopElectAll() {
        log.warn("Stop All Elect Service");
        //Set<String> keySet = latchMaps.keySet();
        List<String> keyList = new ArrayList<String>();
        for (String key : latchMaps.keySet()) {
            keyList.add(key);
        }

        for (String key : keyList) {
            try {
                LeaderLatch latch = latchMaps.remove(key);
                if (latch != null) {
                    log.warn("Close LeaderLatch:{}", key);
                    CloseableUtils.closeQuietly(latch);
                    latch = null;
                }

                Thread thread = threadMaps.remove(key);
                if (thread != null) {
                    log.warn("Close ElectThread:{}", key);
                    thread.interrupt();
                    thread = null;
                }
            } catch (Throwable e) {
                log.error("Stop All Elect Service:{},ERROR:", key, e);
            }

        }
        // threadMaps.clear();
        // latchMaps.clear();

    }
    public static String getLocalHostIP(){
        try {
            return (java.net.InetAddress.getLocalHost()).toString();
        } catch (UnknownHostException e) {
            return "";
        }

    }

}
